package com.luke.dt.user.listener;

import com.luke.dt.commons.entitys.shared.TblDtJudeDuplicate;
import com.luke.dt.user.service.JudeDuplicateService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
import java.io.IOException;

/**
 * 参考：https://blog.csdn.net/linpeng_1/article/details/80505828
 * https://blog.csdn.net/azhegps/article/details/53815201
 * rabbitmq消费者（消费指定队列）
 *
 * 这里需要注意一个问题：
 * 假设A服务发送过来的消息顺序是msgA、msgB、msgC
 * 那么B服务在只有单实例也会顺序接收到msgA、msgB、msgC
 * 但是如果B服务在接收到msgA后，处理业务逻辑出现错误，将消息会送给消息队列。
 * 此时消息队列拿到消息后再从新投送的时候，B服务已经处理了msgB和msgC
 * 那么此时B服务的消息处理顺序就变成了msgB、msgC、msgA
 * 所以需要保证消费顺序的，应由业务自己去处理。
 *
 */
@Slf4j
@Component
@RabbitListener(queues = "dt_queue_user")
public class DtQueueListener {

    @Autowired(required = false)
   private JudeDuplicateService judeDuplicateService;

    @RabbitHandler
    public void handleDtQueueMsg(String msg, Channel channel, Message message){
        log.info("=======处理队列数据开始=======");
        try {
            TblDtJudeDuplicate dbJudeDuplicate = judeDuplicateService.handleReduUserBalance(msg);
            log.info("JudeDuplicate staus:{}",dbJudeDuplicate.getStatusVal());
            if(dbJudeDuplicate.getStatusVal() == 2){//处理成功
                //消息消费成功，返回ack
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            }else{
                //消息消费失败，更新防重表
                Integer handNum = judeDuplicateService.handleMsgFail(msg);
                if(handNum >= 3){
                    //返回ack，人工介入手动处理该消息
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);//消息不返回给队列（丢弃）
                }else{
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);//消息返回给队列
                }
            }
        } catch (Exception e) {
            try {
                //处理消费队列消息失败
                log.error("处理队列数据业务出现出现错误：{}",e.getMessage());
                Integer handNum = judeDuplicateService.handleMsgFail(msg);
                if(handNum >= 3){
                    //返回ack，人工介入手动处理该消息
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);//消息不返回给队列（丢弃）
                }else{
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);//消息返回给队列
                }
            } catch (Exception e1) {
                log.error("更新防重表出现错误：{}",e1.getMessage());
                try {
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);//消息返回给队列
                } catch (IOException e2) {
                    log.error("返回ack消息出现错误:{}",e2.getMessage());
                }
            }
        }
        log.info("=======处理队列数据结束=======");
    }
}
